{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "slide"
    }
   },
   "source": [
    "# Gotcha's from Pandas to Dask\n",
    "\n",
    "This notebook highlights some key differences when transfering code from `Pandas` to run in a `Dask` environment.  \n",
    "Most issues have a link to the [Dask documentation](https://docs.dask.org/en/latest/) for additional information."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Dask versoin: 1.2.2\n",
      "Pandas versoin: 0.24.2\n"
     ]
    }
   ],
   "source": [
    "# since Dask is activly beeing developed - the current example is running with the below version\n",
    "import dask\n",
    "import dask.dataframe as dd\n",
    "import pandas as pd\n",
    "print(f'Dask versoin: {dask.__version__}')\n",
    "print(f'Pandas versoin: {pd.__version__}')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "source": [
    "## Start Dask Client for Dashboard\n",
    "\n",
    "Starting the Dask Client is optional.  In this example we are running on a `LocalCluster`, this  will also provide a dashboard which is useful to gain insight on the computation.  \n",
    "For additional information on [Dask Client see documentation](https://docs.dask.org/en/latest/setup.html?highlight=client#setup)  \n",
    "\n",
    "The link to the dashboard will become visible when you create a client (as shown below).  \n",
    "When running within `Jupyter Lab` an [extenstion](https://github.com/dask/dask-labextension) can be installed to view the various dashboard widgets. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "outputs": [
    {
     "data": {
      "text/html": [
       "<table style=\"border: 2px solid white;\">\n",
       "<tr>\n",
       "<td style=\"vertical-align: top; border: 0px solid white\">\n",
       "<h3>Client</h3>\n",
       "<ul>\n",
       "  <li><b>Scheduler: </b>tcp://127.0.0.1:58069\n",
       "  <li><b>Dashboard: </b><a href='http://127.0.0.1:8787/status' target='_blank'>http://127.0.0.1:8787/status</a>\n",
       "</ul>\n",
       "</td>\n",
       "<td style=\"vertical-align: top; border: 0px solid white\">\n",
       "<h3>Cluster</h3>\n",
       "<ul>\n",
       "  <li><b>Workers: </b>4</li>\n",
       "  <li><b>Cores: </b>4</li>\n",
       "  <li><b>Memory: </b>8.50 GB</li>\n",
       "</ul>\n",
       "</td>\n",
       "</tr>\n",
       "</table>"
      ],
      "text/plain": [
       "<Client: scheduler='tcp://127.0.0.1:58069' processes=4 cores=4>"
      ]
     },
     "execution_count": 2,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from dask.distributed import Client\n",
    "# client = Client(n_workers=1, threads_per_worker=4, processes=False, memory_limit='2GB')\n",
    "client = Client()\n",
    "client"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "See [documentation for addtional cluster configuration](http://distributed.dask.org/en/latest/local-cluster.html)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "slide"
    }
   },
   "source": [
    "# Create 2 DataFrames for comparison: \n",
    "1. for Dask \n",
    "2. for Pandas  \n",
    "Dask comes with builtin dataset samples, we will use this sample for our example. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {
    "slideshow": {
     "slide_type": "fragment"
    }
   },
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div><strong>Dask DataFrame Structure:</strong></div>\n",
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>id</th>\n",
       "      <th>name</th>\n",
       "      <th>x</th>\n",
       "      <th>y</th>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>npartitions=30</th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>2000-01-01</th>\n",
       "      <td>int32</td>\n",
       "      <td>object</td>\n",
       "      <td>float64</td>\n",
       "      <td>float64</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2000-01-02</th>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>...</th>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2000-01-30</th>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2000-01-31</th>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>\n",
       "<div>Dask Name: make-timeseries, 30 tasks</div>"
      ],
      "text/plain": [
       "Dask DataFrame Structure:\n",
       "                   id    name        x        y\n",
       "npartitions=30                                 \n",
       "2000-01-01      int32  object  float64  float64\n",
       "2000-01-02        ...     ...      ...      ...\n",
       "...               ...     ...      ...      ...\n",
       "2000-01-30        ...     ...      ...      ...\n",
       "2000-01-31        ...     ...      ...      ...\n",
       "Dask Name: make-timeseries, 30 tasks"
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "ddf = dask.datasets.timeseries()\n",
    "ddf"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "source": [
    "* Remember `Dask framework` is **lazy** thus in order to see the result we need to run [compute()](https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.compute) \n",
    " (or `head()` which runs under the hood compute()) )"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>id</th>\n",
       "      <th>name</th>\n",
       "      <th>x</th>\n",
       "      <th>y</th>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>timestamp</th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>2000-01-01 00:00:00</th>\n",
       "      <td>972</td>\n",
       "      <td>Alice</td>\n",
       "      <td>0.532136</td>\n",
       "      <td>0.229624</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2000-01-01 00:00:01</th>\n",
       "      <td>964</td>\n",
       "      <td>Patricia</td>\n",
       "      <td>0.004575</td>\n",
       "      <td>-0.344598</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "                      id      name         x         y\n",
       "timestamp                                             \n",
       "2000-01-01 00:00:00  972     Alice  0.532136  0.229624\n",
       "2000-01-01 00:00:01  964  Patricia  0.004575 -0.344598"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "ddf.head(2)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "source": [
    "#### Pandas Dataframe\n",
    "In order to create a `Pandas` dataframe we can use the `compute()` method from a `Dask dataframe`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "<class 'pandas.core.frame.DataFrame'>\n"
     ]
    },
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>id</th>\n",
       "      <th>name</th>\n",
       "      <th>x</th>\n",
       "      <th>y</th>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>timestamp</th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>2000-01-01 00:00:00</th>\n",
       "      <td>972</td>\n",
       "      <td>Alice</td>\n",
       "      <td>0.532136</td>\n",
       "      <td>0.229624</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2000-01-01 00:00:01</th>\n",
       "      <td>964</td>\n",
       "      <td>Patricia</td>\n",
       "      <td>0.004575</td>\n",
       "      <td>-0.344598</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "                      id      name         x         y\n",
       "timestamp                                             \n",
       "2000-01-01 00:00:00  972     Alice  0.532136  0.229624\n",
       "2000-01-01 00:00:01  964  Patricia  0.004575 -0.344598"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "pdf = ddf.compute()  \n",
    "print(type(pdf))\n",
    "pdf.head(2)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### dataframe.shape  \n",
    "We can also see *dask laziness* when using the shape attribute"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Pandas shape: (2592000, 4)\n",
      "---------------------------\n",
      "Dask lazy shape: (Delayed('int-f7698154-62ad-435d-a7fa-ec5212b44a1c'), 4)\n"
     ]
    }
   ],
   "source": [
    "print(f'Pandas shape: {pdf.shape}')\n",
    "print('---------------------------')\n",
    "print(f'Dask lazy shape: {ddf.shape}')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We cannot get the full shape before accessing all the partitions - running `len` will do so"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Dask computed shape: 2,592,000\n"
     ]
    }
   ],
   "source": [
    "print(f'Dask computed shape: {len(ddf.index):,}')  # expensive"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "slide"
    }
   },
   "source": [
    "## Creating a `Dask dataframe` from `Pandas`\n",
    "In order to utilize `Dask` capablities on an existing `Pandas dataframe` (pdf) we need to convert the `Pandas dataframe` into a `Dask dataframe` (ddf)  with the [from_pandas](https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.from_pandas) method. \n",
    "You must supply the number of partitions or chunksize that will be used to generate the dask dataframe"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div><strong>Dask DataFrame Structure:</strong></div>\n",
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>id</th>\n",
       "      <th>name</th>\n",
       "      <th>x</th>\n",
       "      <th>y</th>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>npartitions=10</th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>2000-01-01 00:00:00</th>\n",
       "      <td>int32</td>\n",
       "      <td>object</td>\n",
       "      <td>float64</td>\n",
       "      <td>float64</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2000-01-04 00:00:00</th>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>...</th>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2000-01-28 00:00:00</th>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2000-01-30 23:59:59</th>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>\n",
       "<div>Dask Name: from_pandas, 10 tasks</div>"
      ],
      "text/plain": [
       "Dask DataFrame Structure:\n",
       "                        id    name        x        y\n",
       "npartitions=10                                      \n",
       "2000-01-01 00:00:00  int32  object  float64  float64\n",
       "2000-01-04 00:00:00    ...     ...      ...      ...\n",
       "...                    ...     ...      ...      ...\n",
       "2000-01-28 00:00:00    ...     ...      ...      ...\n",
       "2000-01-30 23:59:59    ...     ...      ...      ...\n",
       "Dask Name: from_pandas, 10 tasks"
      ]
     },
     "execution_count": 8,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "ddf2 = dask.dataframe.from_pandas(pdf, npartitions=10)\n",
    "ddf2"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "source": [
    "## Partitions in Dask Dataframes"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Notice that when we created a `Dask dataframe` we needed to supply an argument of `npartitions`.  \n",
    "    The number of partitions will assist `Dask` on how to breakup the  `Pandas Datafram` and parallelize the computation.  \n",
    "Each partition is a *separate* dataframe. For additional information see [partition documentation](https://docs.dask.org/en/latest/dataframe-design.html?highlight=meta%20utils#partitions)  \n",
    "\n",
    "An example for this can be seen when examing the `reset_ index()` method:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "timestamp    2000-01-01 00:00:00\n",
       "id                           972\n",
       "name                       Alice\n",
       "x                       0.532136\n",
       "y                       0.229624\n",
       "Name: 0, dtype: object"
      ]
     },
     "execution_count": 9,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "pdf2 = pdf.reset_index()\n",
    "# Only 1 row\n",
    "pdf2.loc[0]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "c:\\users\\jsber\\.virtualenvs\\dask-examples-3r4mgfnb\\lib\\site-packages\\distributed\\worker.py:3101: UserWarning: Large object of size 9.42 MB detected in task graph: \n",
      "  (slice(0, 0, None), None, 'reset_index-8356d669ad1 ... 54123c422adde')\n",
      "Consider scattering large objects ahead of time\n",
      "with client.scatter to reduce scheduler burden and \n",
      "keep data on workers\n",
      "\n",
      "    future = client.submit(func, big_data)    # bad\n",
      "\n",
      "    big_future = client.scatter(big_data)     # good\n",
      "    future = client.submit(func, big_future)  # good\n",
      "  % (format_bytes(len(b)), s)\n"
     ]
    },
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>timestamp</th>\n",
       "      <th>id</th>\n",
       "      <th>name</th>\n",
       "      <th>x</th>\n",
       "      <th>y</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>2000-01-01</td>\n",
       "      <td>972</td>\n",
       "      <td>Alice</td>\n",
       "      <td>0.532136</td>\n",
       "      <td>0.229624</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>2000-01-04</td>\n",
       "      <td>994</td>\n",
       "      <td>Ingrid</td>\n",
       "      <td>0.380598</td>\n",
       "      <td>-0.739793</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>2000-01-07</td>\n",
       "      <td>974</td>\n",
       "      <td>Bob</td>\n",
       "      <td>0.927427</td>\n",
       "      <td>0.390628</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>2000-01-10</td>\n",
       "      <td>1016</td>\n",
       "      <td>Wendy</td>\n",
       "      <td>-0.485949</td>\n",
       "      <td>-0.738365</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>2000-01-13</td>\n",
       "      <td>973</td>\n",
       "      <td>Xavier</td>\n",
       "      <td>0.708157</td>\n",
       "      <td>-0.507682</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>2000-01-16</td>\n",
       "      <td>966</td>\n",
       "      <td>Edith</td>\n",
       "      <td>0.161352</td>\n",
       "      <td>-0.822572</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>2000-01-19</td>\n",
       "      <td>971</td>\n",
       "      <td>Yvonne</td>\n",
       "      <td>-0.193811</td>\n",
       "      <td>0.302984</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>2000-01-22</td>\n",
       "      <td>976</td>\n",
       "      <td>Yvonne</td>\n",
       "      <td>-0.829687</td>\n",
       "      <td>-0.782505</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>2000-01-25</td>\n",
       "      <td>1026</td>\n",
       "      <td>Laura</td>\n",
       "      <td>-0.597932</td>\n",
       "      <td>-0.762958</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>2000-01-28</td>\n",
       "      <td>958</td>\n",
       "      <td>Ingrid</td>\n",
       "      <td>0.773340</td>\n",
       "      <td>0.877205</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "   timestamp    id    name         x         y\n",
       "0 2000-01-01   972   Alice  0.532136  0.229624\n",
       "0 2000-01-04   994  Ingrid  0.380598 -0.739793\n",
       "0 2000-01-07   974     Bob  0.927427  0.390628\n",
       "0 2000-01-10  1016   Wendy -0.485949 -0.738365\n",
       "0 2000-01-13   973  Xavier  0.708157 -0.507682\n",
       "0 2000-01-16   966   Edith  0.161352 -0.822572\n",
       "0 2000-01-19   971  Yvonne -0.193811  0.302984\n",
       "0 2000-01-22   976  Yvonne -0.829687 -0.782505\n",
       "0 2000-01-25  1026   Laura -0.597932 -0.762958\n",
       "0 2000-01-28   958  Ingrid  0.773340  0.877205"
      ]
     },
     "execution_count": 10,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "ddf2 = ddf2.reset_index()\n",
    "# each partition has an index=0\n",
    "ddf2.loc[0].compute() "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Dask Dataframe vs Pandas Dataframe\n",
    "Now that we have a `dask` (ddf) and a `pandas` (pdf) dataframe we can start to compair the interactions with them."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "slide"
    }
   },
   "source": [
    "## Conceptual shift - from Update to Insert/Delete\n",
    "Dask does not update - thus there are no arguments such as `inplace=True` which exist in Pandas.  \n",
    "For more detials see [issue#653 on github](https://github.com/dask/dask/issues/653)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "source": [
    "### Rename Columns"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "* using `inplace=True` is not considerd to be *best practice*. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Index(['id', 'name', 'x', 'y'], dtype='object')\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "Index(['ID', 'name', 'x', 'y'], dtype='object')"
      ]
     },
     "execution_count": 11,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# Pandas \n",
    "print(pdf.columns)\n",
    "# pdf.rename(columns={'id':'ID'}, inplace=True)\n",
    "pdf = pdf.rename(columns={'id':'ID'})\n",
    "pdf.columns"
   ]
  },
  {
   "cell_type": "raw",
   "metadata": {},
   "source": [
    "# Dask - Error\n",
    "# ddf.rename(columns={'id':'ID'}, inplace=True)\n",
    "# ddf.columns\n",
    "\n",
    "'''  python\n",
    "---------------------------------------------------------------------------  \n",
    "TypeError                                 Traceback (most recent call last)  \n",
    "<ipython-input-12-3e70ff3a549e> in <module>  \n",
    "      1 # Dask - Error  \n",
    "----> 2 ddf.rename(columns={'id':'ID'}, inplace=True)  \n",
    "      3 ddf.columns  \n",
    "TypeError: rename() got an unexpected keyword argument 'inplace'  \n",
    "'''"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {
    "slideshow": {
     "slide_type": "fragment"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Index(['id', 'name', 'x', 'y'], dtype='object')\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "Index(['ID', 'name', 'x', 'y'], dtype='object')"
      ]
     },
     "execution_count": 12,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# Dask\n",
    "print(ddf.columns)\n",
    "ddf = ddf.rename(columns={'id':'ID'})\n",
    "ddf.columns"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "slide"
    }
   },
   "source": [
    "## Data manipulations  \n",
    "There are several diffrences when manipulating data.  "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### loc - Pandas"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>ID</th>\n",
       "      <th>name</th>\n",
       "      <th>x</th>\n",
       "      <th>y</th>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>timestamp</th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>2000-01-01 00:00:00</th>\n",
       "      <td>972</td>\n",
       "      <td>Alice</td>\n",
       "      <td>0.532136</td>\n",
       "      <td>22.962368</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2000-01-01 00:00:17</th>\n",
       "      <td>1035</td>\n",
       "      <td>Norbert</td>\n",
       "      <td>0.599016</td>\n",
       "      <td>-63.052837</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "                       ID     name         x          y\n",
       "timestamp                                              \n",
       "2000-01-01 00:00:00   972    Alice  0.532136  22.962368\n",
       "2000-01-01 00:00:17  1035  Norbert  0.599016 -63.052837"
      ]
     },
     "execution_count": 13,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "cond_pdf = (pdf['x']>0.5) & (pdf['x']<0.8)\n",
    "pdf.loc[cond_pdf, ['y']] = pdf['y']* 100\n",
    "pdf[cond_pdf].head(2)"
   ]
  },
  {
   "cell_type": "raw",
   "metadata": {},
   "source": [
    "# Error\n",
    "# cond_ddf = (ddf['x']>0.5) & (ddf['x']<0.8)\n",
    "# ddf.loc[cond_ddf, ['y']] = ddf['y']* 100\n",
    "# ddf[cond_ddf].head(2)\n",
    "\n",
    "'''\n",
    "> TypeError                                 Traceback (most recent call last)  \n",
    "> <ipython-input-16-2bbb2ae570bd> in <module> \n",
    ">      1 # Daske - Error\n",
    ">      2 cond_ddf = (ddf['x']>0.5) & (ddf['x']<0.8)\n",
    ">----> 3 ddf.loc[cond_ddf, ['y']] = ddf['y']* 100\n",
    ">      4 ddf[cond_ddf].head(2)\n",
    "> TypeError: '_LocIndexer' object does not support item assignment  \n",
    "'''"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Dask - use mask/where"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>ID</th>\n",
       "      <th>name</th>\n",
       "      <th>x</th>\n",
       "      <th>y</th>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>timestamp</th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>2000-01-01 00:00:00</th>\n",
       "      <td>972</td>\n",
       "      <td>Alice</td>\n",
       "      <td>0.532136</td>\n",
       "      <td>2296.236839</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2000-01-01 00:00:01</th>\n",
       "      <td>964</td>\n",
       "      <td>Patricia</td>\n",
       "      <td>0.004575</td>\n",
       "      <td>-0.344598</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "                      ID      name         x            y\n",
       "timestamp                                                \n",
       "2000-01-01 00:00:00  972     Alice  0.532136  2296.236839\n",
       "2000-01-01 00:00:01  964  Patricia  0.004575    -0.344598"
      ]
     },
     "execution_count": 14,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# Pandas\n",
    "pdf['y'] = pdf['y'].mask(cond=cond_pdf, other=pdf['y']* 100)\n",
    "pdf.head(2)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>ID</th>\n",
       "      <th>name</th>\n",
       "      <th>x</th>\n",
       "      <th>y</th>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>timestamp</th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>2000-01-01 00:00:00</th>\n",
       "      <td>972</td>\n",
       "      <td>Alice</td>\n",
       "      <td>0.532136</td>\n",
       "      <td>22.962368</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2000-01-01 00:00:01</th>\n",
       "      <td>964</td>\n",
       "      <td>Patricia</td>\n",
       "      <td>0.004575</td>\n",
       "      <td>-0.344598</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "                      ID      name         x          y\n",
       "timestamp                                              \n",
       "2000-01-01 00:00:00  972     Alice  0.532136  22.962368\n",
       "2000-01-01 00:00:01  964  Patricia  0.004575  -0.344598"
      ]
     },
     "execution_count": 15,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "#Dask\n",
    "cond_ddf = (ddf['x']>0.5) & (ddf['x']<0.8)\n",
    "ddf['y'] = ddf['y'].mask(cond=cond_ddf, other=ddf['y']* 100)\n",
    "ddf.head(2)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "For more information see [dask mask documentation](https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.mask)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Meta argument\n",
    "One key feature in `Dask` is the introduction of `meta` arguement.  \n",
    "> `meta` is the prescription of the names/types of the output from the computation  \n",
    "from [stack overflow answer](https://stackoverflow.com/questions/44432868/dask-dataframe-apply-meta)\n",
    "\n",
    "Since `Dask` creates a DAG for the computation, it requires to understand what are the outputs of each calculation stage.  \n",
    "For additinal information see [meta documentation](https://docs.dask.org/en/latest/dataframe-design.html?highlight=meta%20utils#metadata)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>ID</th>\n",
       "      <th>name</th>\n",
       "      <th>x</th>\n",
       "      <th>y</th>\n",
       "      <th>initials</th>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>timestamp</th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>2000-01-01 00:00:00</th>\n",
       "      <td>972</td>\n",
       "      <td>Alice</td>\n",
       "      <td>0.532136</td>\n",
       "      <td>2296.236839</td>\n",
       "      <td>Al</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2000-01-01 00:00:01</th>\n",
       "      <td>964</td>\n",
       "      <td>Patricia</td>\n",
       "      <td>0.004575</td>\n",
       "      <td>-0.344598</td>\n",
       "      <td>Pa</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "                      ID      name         x            y initials\n",
       "timestamp                                                         \n",
       "2000-01-01 00:00:00  972     Alice  0.532136  2296.236839       Al\n",
       "2000-01-01 00:00:01  964  Patricia  0.004575    -0.344598       Pa"
      ]
     },
     "execution_count": 16,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "pdf['initials'] = pdf['name'].apply(lambda x: x[0]+x[1])\n",
    "pdf.head(2)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "c:\\users\\jsber\\.virtualenvs\\dask-examples-3r4mgfnb\\lib\\site-packages\\dask\\dataframe\\core.py:2345: UserWarning: \n",
      "You did not provide metadata, so Dask is running your function on a small dataset to guess output types. It is possible that Dask will guess incorrectly.\n",
      "To provide an explicit output types or to silence this message, please provide the `meta=` keyword, as described in the map or apply function that you are using.\n",
      "  Before: .apply(func)\n",
      "  After:  .apply(func, meta=('name', 'object'))\n",
      "\n",
      "  warnings.warn(meta_warning(meta))\n"
     ]
    },
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>ID</th>\n",
       "      <th>name</th>\n",
       "      <th>x</th>\n",
       "      <th>y</th>\n",
       "      <th>initials</th>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>timestamp</th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>2000-01-01 00:00:00</th>\n",
       "      <td>972</td>\n",
       "      <td>Alice</td>\n",
       "      <td>0.532136</td>\n",
       "      <td>22.962368</td>\n",
       "      <td>Al</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2000-01-01 00:00:01</th>\n",
       "      <td>964</td>\n",
       "      <td>Patricia</td>\n",
       "      <td>0.004575</td>\n",
       "      <td>-0.344598</td>\n",
       "      <td>Pa</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "                      ID      name         x          y initials\n",
       "timestamp                                                       \n",
       "2000-01-01 00:00:00  972     Alice  0.532136  22.962368       Al\n",
       "2000-01-01 00:00:01  964  Patricia  0.004575  -0.344598       Pa"
      ]
     },
     "execution_count": 17,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# Dask - Warning\n",
    "ddf['initials'] = ddf['name'].apply(lambda x: x[0]+x[1])\n",
    "ddf.head(2)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Introducing meta argument"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Describe the outcome type of the calculation\n",
    "meta_arg = pd.Series(object, name='initials')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>ID</th>\n",
       "      <th>name</th>\n",
       "      <th>x</th>\n",
       "      <th>y</th>\n",
       "      <th>initials</th>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>timestamp</th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>2000-01-01 00:00:00</th>\n",
       "      <td>972</td>\n",
       "      <td>Alice</td>\n",
       "      <td>0.532136</td>\n",
       "      <td>22.962368</td>\n",
       "      <td>Al</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2000-01-01 00:00:01</th>\n",
       "      <td>964</td>\n",
       "      <td>Patricia</td>\n",
       "      <td>0.004575</td>\n",
       "      <td>-0.344598</td>\n",
       "      <td>Pa</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "                      ID      name         x          y initials\n",
       "timestamp                                                       \n",
       "2000-01-01 00:00:00  972     Alice  0.532136  22.962368       Al\n",
       "2000-01-01 00:00:01  964  Patricia  0.004575  -0.344598       Pa"
      ]
     },
     "execution_count": 19,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "ddf['initials'] = ddf['name'].apply(lambda x: x[0]+x[1], meta = meta_arg)\n",
    "ddf.head(2)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [],
   "source": [
    "# similar when using a function\n",
    "def func(row):\n",
    "    if (row['x']> 0):\n",
    "        return row['x'] * 1000  \n",
    "    else:\n",
    "        return row['y'] * -1"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>ID</th>\n",
       "      <th>name</th>\n",
       "      <th>x</th>\n",
       "      <th>y</th>\n",
       "      <th>initials</th>\n",
       "      <th>z</th>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>timestamp</th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>2000-01-01 00:00:00</th>\n",
       "      <td>972</td>\n",
       "      <td>Alice</td>\n",
       "      <td>0.532136</td>\n",
       "      <td>22.962368</td>\n",
       "      <td>Al</td>\n",
       "      <td>532.136479</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2000-01-01 00:00:01</th>\n",
       "      <td>964</td>\n",
       "      <td>Patricia</td>\n",
       "      <td>0.004575</td>\n",
       "      <td>-0.344598</td>\n",
       "      <td>Pa</td>\n",
       "      <td>4.574854</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "                      ID      name         x          y initials           z\n",
       "timestamp                                                                   \n",
       "2000-01-01 00:00:00  972     Alice  0.532136  22.962368       Al  532.136479\n",
       "2000-01-01 00:00:01  964  Patricia  0.004575  -0.344598       Pa    4.574854"
      ]
     },
     "execution_count": 21,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "ddf['z'] = ddf.apply(func, axis=1, meta=('z', 'float'))\n",
    "ddf.head(2)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Map partitions\n",
    "* We can supply an ad-hoc function to run on each partition using the [map_partitions](https://dask.readthedocs.io/en/latest/dataframe-api.html#dask.dataframe.DataFrame.map_partitions) method.   \n",
    "Mainly useful for functions that are not implemented in `Dask` or `Pandas` . \n",
    "* Finally we can return a new `dataframe` which needs to be described in the `meta` argument  \n",
    "The function could also include arguments."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>ID</th>\n",
       "      <th>name</th>\n",
       "      <th>x</th>\n",
       "      <th>y</th>\n",
       "      <th>dist</th>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>timestamp</th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>2000-01-01 00:00:00</th>\n",
       "      <td>972</td>\n",
       "      <td>Alice</td>\n",
       "      <td>0.532136</td>\n",
       "      <td>22.962368</td>\n",
       "      <td>NaN</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2000-01-01 00:00:01</th>\n",
       "      <td>964</td>\n",
       "      <td>Patricia</td>\n",
       "      <td>0.004575</td>\n",
       "      <td>-0.344598</td>\n",
       "      <td>23.312937</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2000-01-01 00:00:02</th>\n",
       "      <td>1025</td>\n",
       "      <td>Edith</td>\n",
       "      <td>-0.195473</td>\n",
       "      <td>-0.975117</td>\n",
       "      <td>0.661493</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2000-01-01 00:00:03</th>\n",
       "      <td>974</td>\n",
       "      <td>Quinn</td>\n",
       "      <td>-0.320977</td>\n",
       "      <td>-0.465554</td>\n",
       "      <td>0.524791</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2000-01-01 00:00:04</th>\n",
       "      <td>1000</td>\n",
       "      <td>Kevin</td>\n",
       "      <td>0.838521</td>\n",
       "      <td>0.253437</td>\n",
       "      <td>1.364325</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "                       ID      name         x          y       dist\n",
       "timestamp                                                          \n",
       "2000-01-01 00:00:00   972     Alice  0.532136  22.962368        NaN\n",
       "2000-01-01 00:00:01   964  Patricia  0.004575  -0.344598  23.312937\n",
       "2000-01-01 00:00:02  1025     Edith -0.195473  -0.975117   0.661493\n",
       "2000-01-01 00:00:03   974     Quinn -0.320977  -0.465554   0.524791\n",
       "2000-01-01 00:00:04  1000     Kevin  0.838521   0.253437   1.364325"
      ]
     },
     "execution_count": 22,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import numpy as np\n",
    "def func2(df, coor_x, coor_y, drop_cols):\n",
    "    df['dist'] =  np.sqrt ( (df[coor_x] - df[coor_x].shift())**2  \n",
    "                           +  (df[coor_y] - df[coor_y].shift())**2 )\n",
    "    return df.drop(drop_cols, axis=1)\n",
    "\n",
    "ddf2 = ddf.map_partitions(func2\n",
    "                          , coor_x='x'\n",
    "                          , coor_y='y'\n",
    "                          , drop_cols=['initials', 'z']\n",
    "                          , meta=pd.DataFrame({'ID':'i8'\n",
    "                                              , 'name':str\n",
    "                                              , 'x':'f8'\n",
    "                                              , 'y':'f8'                                              \n",
    "                                              , 'dist':'f8'}, index=[0]))\n",
    "ddf2.head()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Convert index into Time column"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>ID</th>\n",
       "      <th>name</th>\n",
       "      <th>x</th>\n",
       "      <th>y</th>\n",
       "      <th>initials</th>\n",
       "      <th>times</th>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>timestamp</th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>2000-01-01 00:00:00</th>\n",
       "      <td>972</td>\n",
       "      <td>Alice</td>\n",
       "      <td>0.532136</td>\n",
       "      <td>2296.236839</td>\n",
       "      <td>Al</td>\n",
       "      <td>00:00:00</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2000-01-01 00:00:01</th>\n",
       "      <td>964</td>\n",
       "      <td>Patricia</td>\n",
       "      <td>0.004575</td>\n",
       "      <td>-0.344598</td>\n",
       "      <td>Pa</td>\n",
       "      <td>00:00:01</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "                      ID      name         x            y initials     times\n",
       "timestamp                                                                   \n",
       "2000-01-01 00:00:00  972     Alice  0.532136  2296.236839       Al  00:00:00\n",
       "2000-01-01 00:00:01  964  Patricia  0.004575    -0.344598       Pa  00:00:01"
      ]
     },
     "execution_count": 23,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# Only Pandas\n",
    "pdf = pdf.assign(times=pd.to_datetime(pdf.index).time)\n",
    "pdf.head(2)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>ID</th>\n",
       "      <th>name</th>\n",
       "      <th>x</th>\n",
       "      <th>y</th>\n",
       "      <th>initials</th>\n",
       "      <th>z</th>\n",
       "      <th>times</th>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>timestamp</th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>2000-01-01 00:00:00</th>\n",
       "      <td>972</td>\n",
       "      <td>Alice</td>\n",
       "      <td>0.532136</td>\n",
       "      <td>22.962368</td>\n",
       "      <td>Al</td>\n",
       "      <td>532.136479</td>\n",
       "      <td>00:00:00</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2000-01-01 00:00:01</th>\n",
       "      <td>964</td>\n",
       "      <td>Patricia</td>\n",
       "      <td>0.004575</td>\n",
       "      <td>-0.344598</td>\n",
       "      <td>Pa</td>\n",
       "      <td>4.574854</td>\n",
       "      <td>00:00:01</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "                      ID      name         x          y initials           z  \\\n",
       "timestamp                                                                      \n",
       "2000-01-01 00:00:00  972     Alice  0.532136  22.962368       Al  532.136479   \n",
       "2000-01-01 00:00:01  964  Patricia  0.004575  -0.344598       Pa    4.574854   \n",
       "\n",
       "                        times  \n",
       "timestamp                      \n",
       "2000-01-01 00:00:00  00:00:00  \n",
       "2000-01-01 00:00:01  00:00:01  "
      ]
     },
     "execution_count": 24,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# Dask or Pandas\n",
    "ddf = ddf.assign(times=ddf.index.astype('M8[ns]'))\n",
    "# or  ddf = ddf.assign(Time= dask.dataframe.to_datetime(ddf.index, format='%Y-%m-%d'). )\n",
    "ddf['times'] = ddf['times'].dt.time\n",
    "ddf =client.persist(ddf)\n",
    "ddf.head(2)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Drop NA on column"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "metadata": {},
   "outputs": [],
   "source": [
    "# no issue with regular drop columns\n",
    "pdf = pdf.drop(labels=['initials'],axis=1)\n",
    "ddf = ddf.drop(labels=['initials','z'],axis=1) "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Pandas\n",
    "pdf = pdf.assign(colna = None)\n",
    "# Dask\n",
    "ddf = ddf.assign(colna = None)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>ID</th>\n",
       "      <th>name</th>\n",
       "      <th>x</th>\n",
       "      <th>y</th>\n",
       "      <th>times</th>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>timestamp</th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>2000-01-01 00:00:00</th>\n",
       "      <td>972</td>\n",
       "      <td>Alice</td>\n",
       "      <td>0.532136</td>\n",
       "      <td>2296.236839</td>\n",
       "      <td>00:00:00</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2000-01-01 00:00:01</th>\n",
       "      <td>964</td>\n",
       "      <td>Patricia</td>\n",
       "      <td>0.004575</td>\n",
       "      <td>-0.344598</td>\n",
       "      <td>00:00:01</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "                      ID      name         x            y     times\n",
       "timestamp                                                          \n",
       "2000-01-01 00:00:00  972     Alice  0.532136  2296.236839  00:00:00\n",
       "2000-01-01 00:00:01  964  Patricia  0.004575    -0.344598  00:00:01"
      ]
     },
     "execution_count": 27,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "pdf = pdf.dropna(axis=1, how='all')\n",
    "pdf.head(2)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In odrer for `Dask` to drop a column with all `na` it must check all the partitions with `compute()`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 28,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>ID</th>\n",
       "      <th>name</th>\n",
       "      <th>x</th>\n",
       "      <th>y</th>\n",
       "      <th>times</th>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>timestamp</th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>2000-01-01 00:00:00</th>\n",
       "      <td>972</td>\n",
       "      <td>Alice</td>\n",
       "      <td>0.532136</td>\n",
       "      <td>22.962368</td>\n",
       "      <td>00:00:00</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2000-01-01 00:00:01</th>\n",
       "      <td>964</td>\n",
       "      <td>Patricia</td>\n",
       "      <td>0.004575</td>\n",
       "      <td>-0.344598</td>\n",
       "      <td>00:00:01</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "                      ID      name         x          y     times\n",
       "timestamp                                                        \n",
       "2000-01-01 00:00:00  972     Alice  0.532136  22.962368  00:00:00\n",
       "2000-01-01 00:00:01  964  Patricia  0.004575  -0.344598  00:00:01"
      ]
     },
     "execution_count": 28,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "if ddf.colna.isnull().all().compute() == True:   # check if all values in column are Null -  expensive\n",
    "    ddf = ddf.drop(labels=['colna'],axis=1)\n",
    "ddf.head(2)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "##  1.4 Reset Index"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 29,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>ID</th>\n",
       "      <th>name</th>\n",
       "      <th>x</th>\n",
       "      <th>y</th>\n",
       "      <th>times</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>972</td>\n",
       "      <td>Alice</td>\n",
       "      <td>0.532136</td>\n",
       "      <td>2296.236839</td>\n",
       "      <td>00:00:00</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>964</td>\n",
       "      <td>Patricia</td>\n",
       "      <td>0.004575</td>\n",
       "      <td>-0.344598</td>\n",
       "      <td>00:00:01</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "    ID      name         x            y     times\n",
       "0  972     Alice  0.532136  2296.236839  00:00:00\n",
       "1  964  Patricia  0.004575    -0.344598  00:00:01"
      ]
     },
     "execution_count": 29,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# Pandas\n",
    "pdf =pdf.reset_index(drop=True)\n",
    "pdf.head(2)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 30,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>ID</th>\n",
       "      <th>name</th>\n",
       "      <th>x</th>\n",
       "      <th>y</th>\n",
       "      <th>times</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>972</td>\n",
       "      <td>Alice</td>\n",
       "      <td>0.532136</td>\n",
       "      <td>22.962368</td>\n",
       "      <td>00:00:00</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>964</td>\n",
       "      <td>Patricia</td>\n",
       "      <td>0.004575</td>\n",
       "      <td>-0.344598</td>\n",
       "      <td>00:00:01</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "    ID      name         x          y     times\n",
       "0  972     Alice  0.532136  22.962368  00:00:00\n",
       "1  964  Patricia  0.004575  -0.344598  00:00:01"
      ]
     },
     "execution_count": 30,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# Dask\n",
    "ddf = ddf.reset_index()\n",
    "ddf = ddf.drop(labels=['timestamp'], axis=1 )\n",
    "ddf.head(2)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Read / Save files"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "* When working with `pandas` and `dask` preferable use [parquet format](https://docs.dask.org/en/latest/dataframe-best-practices.html?highlight=parquet#store-data-in-apache-parquet-format).  \n",
    "* When working with `Dask` - files can be read with multiple workers .  \n",
    "* Most `kwargs` are applicable for reading and writing files   \n",
    "e.g. \n",
    "ddf = dd.read_csv('data/pd2dd/ddf*.csv', compression='gzip', header=False).  \n",
    "* However some are not available such as  `nrows`.\n",
    "\n",
    "[see documentaion](https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.to_csv) (including the option for output file naming)."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Save files"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 31,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pathlib import Path\n",
    "output_dir_file = Path('data/pdf_single_file.csv')\n",
    "output_dir_file.parent.mkdir(parents=True, exist_ok=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 32,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Wall time: 22.1 s\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "# Pandas\n",
    "pdf.to_csv(output_dir_file)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 33,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[WindowsPath('data/pdf_single_file.csv')]"
      ]
     },
     "execution_count": 33,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "list(output_dir_file.parent.glob('*.csv'))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "`Dask`\n",
    "Notice the '*' to allow for multiple file renaming. \n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 34,
   "metadata": {},
   "outputs": [],
   "source": [
    "output_dask_dir = Path('data/dask_multi_files/')\n",
    "output_dask_dir.mkdir(parents=True, exist_ok=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 35,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Wall time: 13.7 s\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "['data\\\\dask_multi_files/ddf00.csv',\n",
       " 'data\\\\dask_multi_files/ddf01.csv',\n",
       " 'data\\\\dask_multi_files/ddf02.csv',\n",
       " 'data\\\\dask_multi_files/ddf03.csv',\n",
       " 'data\\\\dask_multi_files/ddf04.csv',\n",
       " 'data\\\\dask_multi_files/ddf05.csv',\n",
       " 'data\\\\dask_multi_files/ddf06.csv',\n",
       " 'data\\\\dask_multi_files/ddf07.csv',\n",
       " 'data\\\\dask_multi_files/ddf08.csv',\n",
       " 'data\\\\dask_multi_files/ddf09.csv',\n",
       " 'data\\\\dask_multi_files/ddf10.csv',\n",
       " 'data\\\\dask_multi_files/ddf11.csv',\n",
       " 'data\\\\dask_multi_files/ddf12.csv',\n",
       " 'data\\\\dask_multi_files/ddf13.csv',\n",
       " 'data\\\\dask_multi_files/ddf14.csv',\n",
       " 'data\\\\dask_multi_files/ddf15.csv',\n",
       " 'data\\\\dask_multi_files/ddf16.csv',\n",
       " 'data\\\\dask_multi_files/ddf17.csv',\n",
       " 'data\\\\dask_multi_files/ddf18.csv',\n",
       " 'data\\\\dask_multi_files/ddf19.csv',\n",
       " 'data\\\\dask_multi_files/ddf20.csv',\n",
       " 'data\\\\dask_multi_files/ddf21.csv',\n",
       " 'data\\\\dask_multi_files/ddf22.csv',\n",
       " 'data\\\\dask_multi_files/ddf23.csv',\n",
       " 'data\\\\dask_multi_files/ddf24.csv',\n",
       " 'data\\\\dask_multi_files/ddf25.csv',\n",
       " 'data\\\\dask_multi_files/ddf26.csv',\n",
       " 'data\\\\dask_multi_files/ddf27.csv',\n",
       " 'data\\\\dask_multi_files/ddf28.csv',\n",
       " 'data\\\\dask_multi_files/ddf29.csv']"
      ]
     },
     "execution_count": 35,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "%%time\n",
    "# Dask\n",
    "ddf.to_csv(f'{output_dask_dir}/ddf*.csv', index = False)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "To find the number of partitions which will determine the number of output files use [dask.dataframe.npartitions](https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.npartitions)  \n",
    "\n",
    "To change the number of output files use [repartition](https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.repartition) which is an expensive operation."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 36,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "30"
      ]
     },
     "execution_count": 36,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "ddf.npartitions"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Read Multiple files"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "For `pandas` it is possible to iterate and concat the files [see answer from stack overflow](https://stackoverflow.com/questions/20906474/import-multiple-csv-files-into-pandas-and-concatenate-into-one-dataframe)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 37,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Wall time: 6.58 s\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "# Pandas\n",
    "concat_df = pd.concat([pd.read_csv(f) \n",
    "                       for f in list(output_dask_dir.iterdir())])\n",
    "len(concat_df)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 38,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Wall time: 99.9 ms\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "# Dask\n",
    "_ddf = dd.read_csv(output_dask_dir/'ddf*.csv')\n",
    "_ddf"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Remember that `Dask` is lazy - thus it does not *realy* read the file until it needs to..."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 39,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Wall time: 2.92 s\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "_ddf = dd.read_csv(output_dask_dir/'ddf*.csv')\n",
    "len(_ddf)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    " ## Consider using client.persist()\n",
    " Since Dask is lazy - it may run the **entire** graph/DAG (again) even if it already run part of the calculation in a previous cell.  Thus use [persist](https://docs.dask.org/en/latest/dataframe-best-practices.html?highlight=parquet#persist-intelligently) to keep the results in memory  \n",
    "Additional information can be read in this [stackoverflow issue](https://stackoverflow.com/questions/45941528/how-to-efficiently-send-a-large-numpy-array-to-the-cluster-with-dask-array/45941529#45941529) or see an exampel in [this post](http://matthewrocklin.com/blog/work/2017/01/12/dask-dataframes)   \n",
    "This concept should also  be used when running a code within a script (rather then a jupyter notebook) which incoperates loops within the code.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 40,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>ID</th>\n",
       "      <th>name</th>\n",
       "      <th>x</th>\n",
       "      <th>y</th>\n",
       "      <th>times</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>972</td>\n",
       "      <td>Alice</td>\n",
       "      <td>0.532136</td>\n",
       "      <td>22.962368</td>\n",
       "      <td>00:00:00</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>964</td>\n",
       "      <td>Patricia</td>\n",
       "      <td>0.004575</td>\n",
       "      <td>-0.344598</td>\n",
       "      <td>00:00:01</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "    ID      name         x          y     times\n",
       "0  972     Alice  0.532136  22.962368  00:00:00\n",
       "1  964  Patricia  0.004575  -0.344598  00:00:01"
      ]
     },
     "execution_count": 40,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# e.g.\n",
    "_ddf = dd.read_csv(output_dask_dir/'ddf*.csv')\n",
    "# do some filter\n",
    "_ddf = client.persist(_ddf)\n",
    "# do some computations\n",
    "_ddf.head(2)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Group By - custom aggregations\n",
    "In addition to the [groupby notebook example](https://github.com/dask/dask-examples/blob/master/dataframes/02-groupby.ipynb) that is in the repository -  \n",
    "This is another example how to try to eliminate the use of `groupby.apply`.   \n",
    "In this example we are grouping columns into unique lists."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Pandas"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 41,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>name</th>\n",
       "      <th>ID</th>\n",
       "      <th>seconds</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>Alice</td>\n",
       "      <td>972</td>\n",
       "      <td>00</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>Patricia</td>\n",
       "      <td>964</td>\n",
       "      <td>00</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2</th>\n",
       "      <td>Edith</td>\n",
       "      <td>1025</td>\n",
       "      <td>00</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>3</th>\n",
       "      <td>Quinn</td>\n",
       "      <td>974</td>\n",
       "      <td>00</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>4</th>\n",
       "      <td>Kevin</td>\n",
       "      <td>1000</td>\n",
       "      <td>00</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "       name    ID seconds\n",
       "0     Alice   972      00\n",
       "1  Patricia   964      00\n",
       "2     Edith  1025      00\n",
       "3     Quinn   974      00\n",
       "4     Kevin  1000      00"
      ]
     },
     "execution_count": 41,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# prepare pandas dataframe\n",
    "pdf = pdf.assign(time=pd.to_datetime(pdf.index).time)\n",
    "pdf['seconds'] = pdf.time.astype(str).str[-2:]\n",
    "cols_for_demo =['name', 'ID','seconds']\n",
    "pdf[cols_for_demo].head()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 42,
   "metadata": {},
   "outputs": [],
   "source": [
    "pdf_gb = pdf.groupby(pdf.name)\n",
    "gp_col = ['ID', 'seconds']\n",
    "list_ser_gb = [pdf_gb[att_col_gr].apply\n",
    "               (lambda x: list(set(x.to_list()))) \n",
    "               for att_col_gr in gp_col]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 43,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "       Weight                                                 ID  \\\n",
      "name                                                               \n",
      "Alice   99611  [1024, 1025, 1026, 1027, 1028, 1029, 1030, 103...   \n",
      "Bob     99846  [1024, 1025, 1026, 1027, 1028, 1029, 1030, 103...   \n",
      "\n",
      "                                                 seconds  \n",
      "name                                                      \n",
      "Alice  [70, 52, 37, 45, 40, 03, 56, 61, 04, 00, 62, 7...  \n",
      "Bob    [70, 52, 37, 45, 40, 03, 56, 61, 04, 00, 62, 7...  \n",
      "Wall time: 150 ms\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "df_edge_att = pdf_gb.size().to_frame(name=\"Weight\")\n",
    "for ser in list_ser_gb:\n",
    "        df_edge_att = df_edge_att.join(ser.to_frame(), how='left')      \n",
    "print(df_edge_att.head(2))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "* Remeber that in any some cases `Pandas` is more efficiante (assuming that you can load all the data into the RAM).  "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Dask"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 44,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>name</th>\n",
       "      <th>ID</th>\n",
       "      <th>seconds</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>Alice</td>\n",
       "      <td>972</td>\n",
       "      <td>00</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>Patricia</td>\n",
       "      <td>964</td>\n",
       "      <td>01</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "       name   ID seconds\n",
       "0     Alice  972      00\n",
       "1  Patricia  964      01"
      ]
     },
     "execution_count": 44,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "def set_list_att(x: dd.Series):\n",
    "        return list(set([item for item in x.values]))\n",
    "ddf['seconds'] = ddf.times.astype(str).str[-2:]\n",
    "ddf = client.persist(ddf)\n",
    "ddf[cols_for_demo].head(2)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 45,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "Index(['ID', 'name', 'x', 'y', 'times', 'seconds'], dtype='object')"
      ]
     },
     "execution_count": 45,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "ddf.columns"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 46,
   "metadata": {},
   "outputs": [],
   "source": [
    "df_gb = ddf.groupby(ddf.name)\n",
    "gp_col = ['ID', 'seconds']\n",
    "list_ser_gb = [df_gb[att_col_gr].apply(set_list_att\n",
    "                ,meta=pd.Series(dtype='object', name=f'{att_col_gr}_att')) \n",
    "               for att_col_gr in gp_col]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 47,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Wall time: 20 s\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "df_edge_att = df_gb.size().to_frame(name=\"Weight\")\n",
    "for ser in list_ser_gb:\n",
    "    df_edge_att = df_edge_att.join(ser.to_frame(), how='left')\n",
    "df_edge_att.head(2)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We can do better...   \n",
    "Using [dask custom aggregation](https://docs.dask.org/en/latest/dataframe-api.html?highlight=dropna#dask.dataframe.groupby.Aggregation) is consideribly better"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 48,
   "metadata": {},
   "outputs": [],
   "source": [
    "import itertools\n",
    "custom_agg = dd.Aggregation(\n",
    "    'custom_agg', \n",
    "    lambda s: s.apply(set), \n",
    "    lambda s: s.apply(lambda chunks: list(set(itertools.chain.from_iterable(chunks)))),)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 49,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Wall time: 2.98 s\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "df_gb = ddf.groupby(ddf.name)\n",
    "gp_col = ['ID', 'seconds']\n",
    "list_ser_gb = [df_gb[att_col_gr].agg(custom_agg) for att_col_gr in gp_col]\n",
    "df_edge_att = df_gb.size().to_frame(name=\"Weight\")\n",
    "for ser in list_ser_gb:\n",
    "        df_edge_att = df_edge_att.join(ser.to_frame(), how='left')\n",
    "df_edge_att.head(2)  "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## [Debugging](https://docs.dask.org/en/latest/debugging.html)\n",
    "Debugging may be challenging...\n",
    "1. Run code without client \n",
    "2. Use Dashboard profiler\n",
    "3. Verify integrity of DAG"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Corrupted DAG  \n",
    "In this example we show that once the DAG is currupted you may need to reset the calculation"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 50,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>id</th>\n",
       "      <th>name</th>\n",
       "      <th>x</th>\n",
       "      <th>y</th>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>timestamp</th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>2000-01-01</th>\n",
       "      <td>1035</td>\n",
       "      <td>Xavier</td>\n",
       "      <td>0.066367</td>\n",
       "      <td>0.765965</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "              id    name         x         y\n",
       "timestamp                                   \n",
       "2000-01-01  1035  Xavier  0.066367  0.765965"
      ]
     },
     "execution_count": 50,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# reset dataframe\n",
    "ddf = dask.datasets.timeseries()\n",
    "ddf.head(1)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 51,
   "metadata": {},
   "outputs": [],
   "source": [
    "def func_dist2(df, coor_x, coor_y):\n",
    "    dist =  np.sqrt ( (df[coor_x] - df[coor_x].shift())^2    # `^` <-- wrong syntax\n",
    "                     +  (df[coor_y] - df[coor_y].shift())^2 )  # `^` <-- wrong syntax\n",
    "    return dist\n",
    "ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'\n",
    "                                , meta=('float'))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Is everything OK?"
   ]
  },
  {
   "cell_type": "raw",
   "metadata": {},
   "source": [
    "# Results in error\n",
    "ddf.head()\n",
    "\n",
    "---------------------------------------------------------------------------\n",
    "TypeError                                 Traceback (most recent call last)\n",
    "<ipython-input-67-837310ab3f2b> in <module>\n",
    "      1 # returns an error because of ^2 (needs to be **2)\n",
    "----> 2 ddf.head()\n",
    "\n",
    "c:\\users\\jsber\\.virtualenvs\\dask-examples-3r4mgfnb\\lib\\site-packages\\dask\\dataframe\\core.py in head(self, n, npartitions, compute)\n",
    "    898 \n",
    "    899         if compute:\n",
    "--> 900             result = result.compute()\n",
    "    901         return result\n",
    "    902 \n",
    "\n",
    "c:\\users\\jsber\\.virtualenvs\\dask-examples-3r4mgfnb\\lib\\site-packages\\dask\\base.py in compute(self, **kwargs)\n",
    "    154         dask.base.compute\n",
    "    155         \"\"\"\n",
    "--> 156         (result,) = compute(self, traverse=False, **kwargs)\n",
    "    157         return result\n",
    "    158 \n",
    "\n",
    "pandas\\_libs\\ops.pyx in pandas._libs.ops.vec_binop()\n",
    "\n",
    "pandas\\_libs\\ops.pyx in pandas._libs.ops.vec_binop()\n",
    "\n",
    "TypeError: unsupported operand type(s) for ^: 'float' and 'bool'\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "* Even if the function is corrected the DAG is corrupted"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 52,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Still results with an error\n",
    "def func_dist2(df, coor_x, coor_y):\n",
    "    dist =  np.sqrt ( (df[coor_x] - df[coor_x].shift())**2  # `**` <-- correct syntax\n",
    "                     +  (df[coor_y] - df[coor_y].shift())**2 )  # `**` <-- correct syntax\n",
    "    return dist\n",
    "ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'\n",
    "                                , meta=('float'))"
   ]
  },
  {
   "cell_type": "raw",
   "metadata": {},
   "source": [
    "# Still Results in error\n",
    "ddf.head()\n",
    "\n",
    "---------------------------------------------------------------------------\n",
    "TypeError                                 Traceback (most recent call last)\n",
    "<ipython-input-67-837310ab3f2b> in <module>\n",
    "      1 # returns an error because of ^2 (needs to be **2)\n",
    "----> 2 ddf.head()\n",
    "\n",
    "c:\\users\\jsber\\.virtualenvs\\dask-examples-3r4mgfnb\\lib\\site-packages\\dask\\dataframe\\core.py in head(self, n, npartitions, compute)\n",
    "    898 \n",
    "    899         if compute:\n",
    "--> 900             result = result.compute()\n",
    "    901         return result\n",
    "    902 \n",
    "\n",
    "c:\\users\\jsber\\.virtualenvs\\dask-examples-3r4mgfnb\\lib\\site-packages\\dask\\base.py in compute(self, **kwargs)\n",
    "    154         dask.base.compute\n",
    "    155         \"\"\"\n",
    "--> 156         (result,) = compute(self, traverse=False, **kwargs)\n",
    "    157         return result\n",
    "    158 \n",
    "\n",
    "pandas\\_libs\\ops.pyx in pandas._libs.ops.vec_binop()\n",
    "\n",
    "pandas\\_libs\\ops.pyx in pandas._libs.ops.vec_binop()\n",
    "\n",
    "TypeError: unsupported operand type(s) for ^: 'float' and 'bool'"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We need to reset the dataframe"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 53,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>id</th>\n",
       "      <th>name</th>\n",
       "      <th>x</th>\n",
       "      <th>y</th>\n",
       "      <th>col</th>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>timestamp</th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>2000-01-01 00:00:00</th>\n",
       "      <td>995</td>\n",
       "      <td>Jerry</td>\n",
       "      <td>-0.464037</td>\n",
       "      <td>0.255622</td>\n",
       "      <td>NaN</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2000-01-01 00:00:01</th>\n",
       "      <td>973</td>\n",
       "      <td>Frank</td>\n",
       "      <td>-0.446939</td>\n",
       "      <td>0.055328</td>\n",
       "      <td>0.201022</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "                      id   name         x         y       col\n",
       "timestamp                                                    \n",
       "2000-01-01 00:00:00  995  Jerry -0.464037  0.255622       NaN\n",
       "2000-01-01 00:00:01  973  Frank -0.446939  0.055328  0.201022"
      ]
     },
     "execution_count": 53,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "ddf = dask.datasets.timeseries()\n",
    "def func_dist2(df, coor_x, coor_y):\n",
    "    dist =  np.sqrt ( (df[coor_x] - df[coor_x].shift())**2    #corrected math function\n",
    "                     +  (df[coor_y] - df[coor_y].shift())**2 )\n",
    "    return dist\n",
    "ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'\n",
    "                                , meta=('float'))\n",
    "ddf.head(2)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "file_extension": ".py",
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.5"
  },
  "mimetype": "text/x-python",
  "name": "python",
  "npconvert_exporter": "python",
  "pygments_lexer": "ipython3",
  "version": 3
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
